#!/usr/bin/env python3
# coding=utf-8

#
# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import platform
import random
import shutil
import subprocess
from pydoc import classname
import time
import os
import sys
import datetime
import xml.etree.ElementTree as ElementTree

from core.constants import SchedulerType
from xdevice import Plugin
from xdevice import get_plugin
from xdevice import platform_logger
from xdevice import Scheduler
from xdevice import DeviceTestType
from core.utils import get_build_output_path
from core.utils import scan_support_product
from core.utils import is_lite_product
from core.common import is_open_source_product
from core.command.parameter import Parameter
from core.command.distribute_execute import DbinderTest
from core.testcase.testcase_manager import TestCaseManager
from core.config.config_manager import UserConfigManager
from core.config.parse_parts_config import ParsePartsConfig
from core.config.resource_manager import ResourceManager

LOG = platform_logger("Run")


class Run(object):

    history_cmd_list = []
    
    @classmethod
    def get_history(self):
        return self.history_cmd_list

    @classmethod
    def get_target_out_path(cls, product_form):
        target_out_path = UserConfigManager().get_test_cases_dir()
        if target_out_path == "":
            target_out_path = os.path.join(
                get_build_output_path(product_form),
                "packages",
                product_form)
        target_out_path = os.path.abspath(target_out_path)
        return target_out_path

    @classmethod
    def _build_test_cases(cls, options):
        if options.coverage:
            LOG.info("Coverage testing, no need to compile testcases")
            return True

        is_build_testcase = UserConfigManager().get_user_config_flag(
            "build", "testcase")
        project_root_path = sys.source_code_root_path
        if is_build_testcase and project_root_path != "":
            from core.build.build_manager import BuildManager
            build_manager = BuildManager()
            return build_manager.build_testcases(project_root_path, options)
        else:
            return True

    @classmethod
    def _check_test_dictionary(cls, test_dictionary):
        is_valid_status = False
        key_list = sorted(test_dictionary.keys())
        for key in key_list:
            file_list = test_dictionary[key]
            if len(file_list) > 0:
                is_valid_status = True
                break
        return is_valid_status

    @classmethod
    def get_tests_out_path(cls, product_form):
        testcase_path = UserConfigManager().get_test_cases_dir()
        if testcase_path == "":
            all_product_list = scan_support_product()
            if product_form in all_product_list:
                if is_open_source_product(product_form):
                    testcase_path = os.path.abspath(os.path.join(
                        get_build_output_path(product_form),
                        "tests"))
                else:
                    testcase_path = os.path.abspath(os.path.join(
                        get_build_output_path(product_form),
                        "tests"))
            else:
                testcase_path = os.path.join(
                    get_build_output_path(product_form), "tests")
        LOG.info("testcase_path=%s" % testcase_path)
        return testcase_path

    @classmethod
    def get_xts_tests_out_path(cls, product_form, testtype):
        xts_testcase_path = UserConfigManager().get_test_cases_dir()
        if xts_testcase_path == "":
            xts_testcase_path = os.path.abspath(os.path.join(
                get_build_output_path(product_form),
                "suites",
                testtype[0],
                "testcases"))
        LOG.info("xts_testcase_path=%s" % xts_testcase_path)
        return xts_testcase_path

    @classmethod
    def get_external_deps_out_path(cls, product_form):
        external_deps_path = os.path.abspath(os.path.join(
            get_build_output_path(product_form),
            "part_deps_info",
            "part_deps_info.json"))
        LOG.info("external_deps_path=%s" % external_deps_path)
        return external_deps_path

    @classmethod
    def get_coverage_outpath(cls, options):
        coverage_out_path = ""
        if options.coverage:
            coverage_out_path = get_build_output_path(options.productform)
            if coverage_out_path == "":
                coverage_out_path = UserConfigManager().get_user_config(
                    "coverage").get("outpath", "")
            if coverage_out_path == "":
                LOG.error("Coverage test: coverage_outpath is empty.")
        return coverage_out_path

    @classmethod
    def get_part_deps_list(cls, productform, testpart):
        #获取预处理部件间依赖的编译结果路径
        external_deps_path = cls.get_external_deps_out_path(productform)
        external_deps_path_list = TestCaseManager().get_part_deps_files(external_deps_path, testpart)
        return external_deps_path_list
        
    def process_command_run(self, command, options):
        current_raw_cmd = ",".join(list(map(str, options.current_raw_cmd.split(" "))))
        if options.coverage and platform.system() != "Windows":
            if not options.pullgcda:
                push_cov_path = os.path.join(sys.framework_root_dir, "local_coverage/push_coverage_so/push_coverage.py")
                if os.path.exists(push_cov_path):
                    if str(options.testpart) == "[]" and str(options.subsystem) == "[]":
                        LOG.info("No subsystem or part input. Not push coverage so.")
                    elif str(options.testpart) != "[]" and str(options.subsystem) != "[]":
                        LOG.info("Subsystem or part, there can be only one parameter exist. Not push coverage so.")
                    else:
                        if str(options.testpart) != "[]":
                            param = str(options.testpart)
                            subprocess.run("python3 {} {} {}".format(
                                push_cov_path, "testpart", param), shell=True)
                        else:
                            param = str(options.subsystem)
                            subprocess.run("python3 {} {} {}".format(
                                push_cov_path, "subsystem", param), shell=True)
                else:
                    print(f"{push_cov_path} not exists.")

            init_gcov_path = os.path.join(sys.framework_root_dir, "local_coverage/resident_service/init_gcov.py")
            if os.path.exists(init_gcov_path):
                subprocess.run("python3 %s command_str=%s" % (
                    init_gcov_path, current_raw_cmd), shell=True)
            else:
                print(f"{init_gcov_path} not exists.")

        para = Parameter()
        test_type_list = para.get_testtype_list(options.testtype)
        if len(test_type_list) == 0:
            LOG.error("The testtype parameter is incorrect.")
            return
        options.testtype = test_type_list

        parser = ParsePartsConfig(options.productform)
        partname_list = parser.get_part_list(
            options.subsystem,
            options.testpart)
        options.partname_list = partname_list
        options.coverage_outpath = self.get_coverage_outpath(options)

        LOG.info("")
        LOG.info("------------------------------------")
        LOG.info("Input parameter:")
        LOG.info("productform   = %s" % options.productform)
        LOG.info("testtype      = %s" % str(options.testtype))
        LOG.info("subsystem     = %s" % str(options.subsystem))
        LOG.info("testpart      = %s" % str(options.testpart))
        LOG.info("testmodule    = %s" % options.testmodule)
        LOG.info("testsuit      = %s" % options.testsuit)
        LOG.info("testcase      = %s" % options.testcase)
        LOG.info("testlevel     = %s" % options.testlevel)
        LOG.info("testargs     = %s" % options.testargs)
        LOG.info("repeat     = %s" % options.repeat)
        LOG.info("retry         = %s" % options.retry)
        LOG.info("historylist   = %s" % options.historylist)
        LOG.info("runhistory   = %s" % options.runhistory)
        LOG.info("partname_list = %s" % str(options.partname_list))
        LOG.info("partdeps = %s" % options.partdeps)
        LOG.info("------------------------------------")
        LOG.info("")

        if not para.check_run_parameter(options):
            LOG.error("Input parameter is incorrect.")
            return
      
        current_time = datetime.datetime.now()
        #记录命令运行历史
        need_record_history = False
        cmd_record = {
            "time" : str(current_time),
            "raw_cmd" : options.current_raw_cmd,
            "result" : "unknown",
            "command": command,
            "options": options
        }
        if not ("-hl" in options.current_raw_cmd or "-rh" in options.current_raw_cmd \
            or "--retry" in options.current_raw_cmd):
            need_record_history = True

        #打印历史记录
        if options.historylist:
            print("The latest command history is: %d" % len(self.history_cmd_list))
            for index, cmd_record in enumerate(self.history_cmd_list):
                print("%d. [%s] - [%s]::[%s]" % (index + 1, cmd_record["time"], 
                      cmd_record["raw_cmd"], cmd_record["result"]))
            return
        #重新运行历史里的一条命令
        if options.runhistory > 0:
            #如果记录大于10则认为非法
            if options.runhistory > 10 or options.runhistory > len(self.history_cmd_list):
                print("input history command[%d] out of range:", options.runhistory)
                return
            cmd_record = self.history_cmd_list[options.runhistory - 1]
            print("run history command:", cmd_record["raw_cmd"])
            need_record_history = False
            command = cmd_record["command"]
            options = cmd_record["options"]

        if options.retry:
            if len(self.history_cmd_list) <= 0:
                LOG.info("No history command exsit")
                return
            history_cmd = self.history_cmd_list[-1]
            command = history_cmd["command"]
            options = history_cmd["options"]
            from xdevice import Variables
            latest_report_path = os.path.join(Variables.temp_dir, "latest/summary_report.xml")
            tree = ElementTree.parse(latest_report_path)
            root = tree.getroot()
            has_failed_case = 0
            test_targets = {}
            fail_list = []
            for child in root:
                print(child.tag, ":", child.attrib)
                for grand in child:
                    print(grand.tag, ":", grand.attrib)
                    for sub_child in grand:
                        if sub_child.tag == 'failure':
                            fail_case = grand.attrib["classname"] + "#" + grand.attrib["name"]
                            fail_list.append(fail_case)
                            has_failed_case += 1
                            break
            test_targets["class"] = fail_list
            setattr(options, "testargs", test_targets)
            print("retry option:", options)
            if has_failed_case > 0:
                if not self._build_test_cases(options):
                    LOG.error("Build test cases failed.")
                    return
                scheduler = get_plugin(plugin_type=Plugin.SCHEDULER,
                                    plugin_id=SchedulerType.SCHEDULER)[0]
                scheduler.exec_command(command, options)
            else:
                LOG.info("No testcase to retry")
            return

        if not self._build_test_cases(options):
            LOG.error("Build test cases failed.")
            return

        if "partdeps" == options.partdeps:
            self.get_part_deps_list(options.productform, options.testpart)
            options.testcases_path = self.get_external_deps_out_path(options.productform)
            LOG.info("partdeps = %s" % options.partdeps)

        if "acts" in options.testtype or "hats" in options.testtype or "hits" in options.testtype:
            test_dict = self.get_xts_test_dict(options)
            options.testcases_path = self.get_xts_tests_out_path(options.productform, options.testtype)
            options.resource_path = self.get_xts_tests_out_path(options.productform, options.testtype)
        else:
            test_dict = self.get_test_dict(options)

        if not self._check_test_dictionary(test_dict):
            LOG.error("The test file list is empty.")
            return
        if options.coverage and platform.system() != "Windows":
            coverage_path = os.path.join(sys.framework_root_dir, "reports/coverage")
            if os.path.exists(coverage_path):
                coverage_process = subprocess.Popen("rm -rf %s" % coverage_path, shell=True)
                coverage_process.communicate()

        if ("distributedtest" in options.testtype and
                len(options.testtype) == 1):
            from core.command.distribute_utils import get_test_case
            from core.command.distribute_utils \
                import check_ditributetest_environment
            from core.command.distribute_utils import make_device_info_file
            from core.command.distribute_utils import make_reports

            local_time = time.localtime()
            create_time = time.strftime('%Y-%m-%d-%H-%M-%S', local_time)
            start_time = time.strftime('%Y-%m-%d %H:%M:%S', local_time)

            if not check_ditributetest_environment():
                return

            output_test = get_test_case(test_dict.get("CXX", None))
            if not output_test:
                return

            result_rootpath = os.path.join(sys.framework_root_dir,
                "reports",
                create_time)

            log_path = os.path.join(result_rootpath, "log")
            tmp_path = os.path.join(result_rootpath, "temp")
            os.makedirs(log_path, exist_ok=True)
            os.makedirs(tmp_path, exist_ok=True)

            Scheduler.start_task_log(log_path)
            make_device_info_file(tmp_path)

            for case in output_test:
                agent_target_name = case["agent_target_name"]
                major_target_name = case["major_target_name"]
                manager = DbinderTest(result_rootpath, case["suits_dir"])
                manager.setUp()
                manager.test_distribute(major_target_name, agent_target_name, options)
                manager.tearDown()

            make_reports(result_rootpath, start_time)
            Scheduler.stop_task_logcat()
        else:
            options.testdict = test_dict
            options.target_outpath = self.get_target_out_path(
                options.productform)

            scheduler = get_plugin(plugin_type=Plugin.SCHEDULER,
                                   plugin_id=SchedulerType.SCHEDULER)[0]
            if scheduler is None:
                LOG.error("Can not find the scheduler plugin.")
            else:
                options.testcases_path = self.get_tests_out_path(options.productform)
                options.resource_path = os.path.abspath(os.path.join(
                    sys.framework_root_dir, "..", "resource"))
                if is_lite_product(options.productform,
                                   sys.source_code_root_path):
                    if options.productform.find("wifiiot") != -1:
                        scheduler.update_test_type_in_source(".bin",
                            DeviceTestType.ctest_lite)
                        scheduler.update_ext_type_in_source("BIN",
                            DeviceTestType.ctest_lite)
                    else:
                        print("productform is not wifiiot")
                scheduler.exec_command(command, options)
        if need_record_history:
            #读文件获取运行结果
            from xdevice import Variables
            latest_report_path = os.path.join(Variables.temp_dir, "latest/summary_report.xml")
            with open(latest_report_path) as report_file:
                for report_line in report_file:
                    if "testsuites name=\"summary_report\"" in report_line:
                        result = report_line.replace("\n", "")
                        result = result.replace("<testsuites name=\"summary_report\" ", "")
                        result = result.replace(">", "")
                        cmd_record["result"] = result
                        break
            if len(self.history_cmd_list) >= 10:
                del self.history_cmd_list[0]
            self.history_cmd_list.append(cmd_record)

        if "fuzztest" == options.testtype[0] and options.coverage is False:
            report = get_plugin(plugin_type=Plugin.REPORTER, plugin_id="ALL")[0]
            latest_corpus_path = os.path.join(sys.framework_root_dir, "reports", "latest_corpus")
            if os.path.exists(latest_corpus_path):
                shutil.rmtree(latest_corpus_path)
            shutil.copytree(os.path.join(report.report_path, "result"), latest_corpus_path)

        if options.coverage and platform.system() != "Windows":
            pull_service_gcov_path = os.path.join(
                sys.framework_root_dir, "local_coverage/resident_service/pull_service_gcda.py")
            if os.path.exists(pull_service_gcov_path):
                subprocess.run("python3 %s command_str=%s" % (pull_service_gcov_path, current_raw_cmd), shell=True)
            else:
                print(f"{pull_service_gcov_path} not exists.")

            if not options.pullgcda:
                cov_main_file_path = os.path.join(sys.framework_root_dir, "local_coverage/coverage_tools.py")
                testpart = ",".join(list(map(str, options.partname_list)))
                if os.path.exists(cov_main_file_path):
                    subprocess.run("python3 %s testpart=%s" % (
                        cov_main_file_path, testpart), shell=True)
                else:
                    print(f"{cov_main_file_path} not exists.")
        return

    def get_xts_test_dict(self, options):
        # 获取XTS测试用例编译结果路径
        xts_test_case_path = self.get_xts_tests_out_path(options.productform, options.testtype)
        if not os.path.exists(xts_test_case_path):
            LOG.error("%s is not exist." % xts_test_case_path)
            return {}
        xts_test_dict = TestCaseManager().get_xts_test_files(xts_test_case_path, options)
        return xts_test_dict

    def get_test_dict(self, options):
        # 获取测试用例编译结果路径
        test_case_path = self.get_tests_out_path(options.productform)
        if not os.path.exists(test_case_path):
            LOG.error("%s is not exist." % test_case_path)
            return {}

        test_dict = TestCaseManager().get_test_files(test_case_path, options)
        return test_dict
